package com.xiaohu.sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/*
    导入依赖：
        flink-connector-jdbc
        mysql-connector-java

    目前只能用addSink做法

 */
public class JDBCSinkDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 假设这是我们的数据源
//        DataStream<String> text = env.fromElements(
//                "Hello, World!",
//                "Flink is awesome!",
//                "This is a test."
//        );

        DataStreamSource<String> text = env.socketTextStream("master", 7777);


        // 转换数据以匹配目标表的列
        DataStream<String[]> mappedData = text.map(new MapFunction<String, String[]>() {
            @Override
            public String[] map(String value) {
                return new String[]{null, value};  // null 表示自增ID
            }
        });

        /*
                        "jdbc:mysql://master:3306/xiaohu_db?useUnicode=true&characterEncoding=utf-8&useSSL=false",
                        "com.mysql.cj.jdbc.Driver",
                        "root",
                        "123456",
                        500
         */
        mappedData.addSink(JdbcSink.sink(
                "INSERT INTO sink_jdbc (info) VALUES (?)",
                new JdbcStatementBuilder<String[]>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, String[] strings) throws SQLException {
                        preparedStatement.setString(1, strings[1]);
                    }
                },
                JdbcExecutionOptions.builder() // 处理设置
                        .withBatchSize(1000) // 一批次1000条
                        .withBatchIntervalMs(200) // 200毫秒或上面的1000条，就写出去
                        .withMaxRetries(5) // 重试5次
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() // 连接选项，new一个内部类
                        .withUrl("jdbc:mysql://master:3306/xiaohu_db?useUnicode=true&characterEncoding=utf-8&useSSL=false")
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("123456")
                        .withConnectionCheckTimeoutSeconds(60) //超时60秒重新连接
                        .build()
        ));

        // 执行任务
        env.execute("Write to MySQL Example");
    }
}
